/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.demos;

import java.io.Serializable;
import java.util.Random;

import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

public class LoadTest implements MembershipListener, ChannelListener, Runnable {
    private static final Log log = LogFactory.getLog(LoadTest.class);
    public static int size = 24000;
    public static final Object mutex = new Object();
    public boolean doRun = true;

    public long bytesReceived = 0;
    public float mBytesReceived = 0;
    public int messagesReceived = 0;
    public boolean send = true;
    public boolean debug = false;
    public int msgCount = 100;
    ManagedChannel channel = null;
    public int statsInterval = 10000;
    public long pause = 0;
    public boolean breakonChannelException = false;
    public boolean async = false;
    public long receiveStart = 0;
    public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;

    static int messageSize = 0;

    public static long messagesSent = 0;
    public static long messageStartSendTime = 0;
    public static long messageEndSendTime = 0;
    public static int threadCount = 0;

    public static synchronized void startTest() {
        threadCount++;
        if (messageStartSendTime == 0) {
            messageStartSendTime = System.currentTimeMillis();
        }
    }

    public static synchronized void endTest() {
        threadCount--;
        if (messageEndSendTime == 0 && threadCount == 0) {
            messageEndSendTime = System.currentTimeMillis();
        }
    }


    public static synchronized long addSendStats(long count) {
        messagesSent += count;
        return 0l;
    }

    private static void printSendStats(long counter, int messageSize) {
        float cnt = counter;
        float size = messageSize;
        float time = (System.currentTimeMillis() - messageStartSendTime) / 1000f;
        log.info("****SEND STATS-" + Thread.currentThread().getName() + "*****" + "\n\tMessage count:" + counter +
                "\n\tTotal bytes  :" + (long) (size * cnt) + "\n\tTotal seconds:" + (time) + "\n\tBytes/second :" +
                (size * cnt / time) + "\n\tMBytes/second:" + (size * cnt / time / 1024f / 1024f));
    }


    public LoadTest(ManagedChannel channel, boolean send, int msgCount, boolean debug, long pause, int stats,
            boolean breakOnEx) {
        this.channel = channel;
        this.send = send;
        this.msgCount = msgCount;
        this.debug = debug;
        this.pause = pause;
        this.statsInterval = stats;
        this.breakonChannelException = breakOnEx;
    }


    @Override
    public void run() {

        long counter = 0;
        long total = 0;
        LoadMessage msg = new LoadMessage();

        try {
            startTest();
            while (total < msgCount) {
                if (channel.getMembers().length == 0 || (!send)) {
                    synchronized (mutex) {
                        try {
                            mutex.wait();
                        } catch (InterruptedException x) {
                            log.info("Thread interrupted from wait");
                        }
                    }
                } else {
                    try {
                        // msg.setMsgNr((int)++total);
                        counter++;
                        if (debug) {
                            printArray(msg.getMessage());
                        }
                        channel.send(channel.getMembers(), msg, channelOptions);
                        if (pause > 0) {
                            if (debug) {
                                System.out.println("Pausing sender for " + pause + " ms.");
                            }
                            Thread.sleep(pause);
                        }
                    } catch (ChannelException x) {
                        if (debug) {
                            log.error("Unable to send message:" + x.getMessage(), x);
                        }
                        log.error("Unable to send message:" + x.getMessage());
                        ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
                        for (ChannelException.FaultyMember faultyMember : faulty) {
                            log.error("Faulty: " + faultyMember);
                        }
                        --counter;
                        if (this.breakonChannelException) {
                            throw x;
                        }
                    }
                }
                if ((counter % statsInterval) == 0 && (counter > 0)) {
                    // add to the global counter
                    counter = addSendStats(counter);
                    // print from the global counter
                    // printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
                    printSendStats(messagesSent, messageSize);

                }

            }
        } catch (Exception x) {
            log.error("Captured error while sending:" + x.getMessage());
            if (debug) {
                log.error("", x);
            }
            printSendStats(messagesSent, messageSize);
        }
        endTest();
    }


    /**
     * memberAdded
     *
     * @param member Member TODO Implement this org.apache.catalina.tribes.MembershipListener method
     */
    @Override
    public void memberAdded(Member member) {
        log.info("Member added:" + member);
        synchronized (mutex) {
            mutex.notifyAll();
        }
    }

    /**
     * memberDisappeared
     *
     * @param member Member TODO Implement this org.apache.catalina.tribes.MembershipListener method
     */
    @Override
    public void memberDisappeared(Member member) {
        log.info("Member disappeared:" + member);
    }

    @Override
    public boolean accept(Serializable msg, Member mbr) {
        return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
    }

    @Override
    public void messageReceived(Serializable msg, Member mbr) {
        if (receiveStart == 0) {
            receiveStart = System.currentTimeMillis();
        }
        if (debug) {
            if (msg instanceof LoadMessage) {
                printArray(((LoadMessage) msg).getMessage());
            }
        }

        if (msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
            LoadMessage tmp = new LoadMessage();
            tmp.setMessage(((ByteMessage) msg).getMessage());
            msg = tmp;
            tmp = null;
        }


        bytesReceived += ((LoadMessage) msg).getMessage().length;
        mBytesReceived += (((LoadMessage) msg).getMessage().length) / 1024f / 1024f;
        messagesReceived++;
        if ((messagesReceived % statsInterval) == 0 || (messagesReceived == msgCount)) {
            float bytes = (((LoadMessage) msg).getMessage().length * messagesReceived);
            float seconds = (System.currentTimeMillis() - receiveStart) / 1000f;
            log.info("****RECEIVE STATS-" + Thread.currentThread().getName() + "*****" + "\n\tMessage count :" +
                    (long) messagesReceived + "\n\tMessage/sec   :" + messagesReceived / seconds +
                    "\n\tTotal bytes   :" + (long) bytes + "\n\tTotal mbytes  :" + (long) mBytesReceived +
                    "\n\tTime since 1st:" + seconds + " seconds" + "\n\tBytes/second  :" + (bytes / seconds) +
                    "\n\tMBytes/second :" + (mBytesReceived / seconds) + "\n");

        }
    }


    public static void printArray(byte[] data) {
        System.out.print("{");
        for (byte datum : data) {
            System.out.print(datum);
            System.out.print(",");
        }
        System.out.println("} size:" + data.length);
    }


    public static class LoadMessage extends ByteMessage {

        public static byte[] outdata = new byte[size];
        public static final Random r = new Random();

        public static int getMessageSize(LoadMessage msg) {
            return msg.getMessage().length;
        }

        static {
            r.nextBytes(outdata);
        }

        protected byte[] message = getMessage();

        public LoadMessage() {
            // Default constructor
        }

        @Override
        public byte[] getMessage() {
            if (message == null) {
                message = outdata;
            }
            return message;
        }

        @Override
        public void setMessage(byte[] data) {
            this.message = data;
        }
    }

    public static void usage() {
        System.out.println("Tribes Load tester.");
        System.out.println("The load tester can be used in sender or received mode or both");
        System.out.println(
                "Usage:\n\t" + "java LoadTest [options]\n\t" + "Options:\n\t\t" + "[-mode receive|send|both]  \n\t\t" +
                        "[-startoptions startflags (default is Channel.DEFAULT) ]  \n\t\t" + "[-debug]  \n\t\t" +
                        "[-count messagecount]  \n\t\t" + "[-stats statinterval]  \n\t\t" +
                        "[-pause nrofsecondstopausebetweensends]  \n\t\t" + "[-threads numberofsenderthreads]  \n\t\t" +
                        "[-size messagesize]  \n\t\t" + "[-sendoptions channeloptions]  \n\t\t" +
                        "[-break (halts execution on exception)]\n" +
                        "[-shutdown (issues a channel.stop() command after send is completed)]\n" +
                        "\tChannel options:" + ChannelCreator.usage() + "\n\n" + "Example:\n\t" +
                        "java LoadTest -port 4004\n\t" + "java LoadTest -bind 192.168.0.45 -port 4005\n\t" +
                        "java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
    }

    public static void main(String[] args) throws Exception {
        boolean send = true;
        boolean debug = false;
        long pause = 0;
        int count = 1000000;
        int stats = 10000;
        boolean breakOnEx = false;
        int threads = 1;
        boolean shutdown = false;
        int startoptions = Channel.DEFAULT;
        int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
        if (args.length == 0) {
            args = new String[] { "-help" };
        }
        for (int i = 0; i < args.length; i++) {
            if ("-threads".equals(args[i])) {
                threads = Integer.parseInt(args[++i]);
            } else if ("-count".equals(args[i])) {
                count = Integer.parseInt(args[++i]);
                System.out.println("Sending " + count + " messages.");
            } else if ("-pause".equals(args[i])) {
                pause = Long.parseLong(args[++i]) * 1000;
            } else if ("-break".equals(args[i])) {
                breakOnEx = true;
            } else if ("-shutdown".equals(args[i])) {
                shutdown = true;
            } else if ("-stats".equals(args[i])) {
                stats = Integer.parseInt(args[++i]);
                System.out.println("Stats every " + stats + " message");
            } else if ("-sendoptions".equals(args[i])) {
                channelOptions = Integer.parseInt(args[++i]);
                System.out.println("Setting send options to " + channelOptions);
            } else if ("-startoptions".equals(args[i])) {
                startoptions = Integer.parseInt(args[++i]);
                System.out.println("Setting start options to " + startoptions);
            } else if ("-size".equals(args[i])) {
                size = Integer.parseInt(args[++i]) - 4;
                System.out.println("Message size will be:" + (size + 4) + " bytes");
            } else if ("-mode".equals(args[i])) {
                if ("receive".equals(args[++i])) {
                    send = false;
                }
            } else if ("-debug".equals(args[i])) {
                debug = true;
            } else if ("-help".equals(args[i])) {
                usage();
                System.exit(1);
            }
        }

        ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args);

        LoadTest test = new LoadTest(channel, send, count, debug, pause, stats, breakOnEx);
        test.channelOptions = channelOptions;
        LoadMessage msg = new LoadMessage();

        messageSize = LoadMessage.getMessageSize(msg);
        channel.addChannelListener(test);
        channel.addMembershipListener(test);
        channel.start(startoptions);
        Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
        while (threads > 1) {
            Thread t = new Thread(test);
            t.setDaemon(true);
            t.start();
            threads--;
            test = new LoadTest(channel, send, count, debug, pause, stats, breakOnEx);
            test.channelOptions = channelOptions;
        }
        test.run();
        if (shutdown && send) {
            channel.stop(Channel.DEFAULT);
        }
        System.out.println("System test complete, sleeping to let threads finish.");
        Thread.sleep(60 * 1000 * 60);
    }

    public static class Shutdown extends Thread {
        ManagedChannel channel = null;

        public Shutdown(ManagedChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            System.out.println("Shutting down...");
            SystemExit exit = new SystemExit(5000);
            exit.setDaemon(true);
            exit.start();
            try {
                channel.stop(Channel.DEFAULT);

            } catch (Exception x) {
                x.printStackTrace();
            }
            System.out.println("Channel stopped.");
        }
    }

    public static class SystemExit extends Thread {
        private long delay;

        public SystemExit(long delay) {
            this.delay = delay;
        }

        @Override
        public void run() {
            try {
                sleep(delay);
            } catch (Exception x) {
                x.printStackTrace();
            }
            System.exit(0);

        }
    }

}